
package com.shiku.imserver.repository;


import com.alibaba.fastjson.JSON;
import com.mongodb.BasicDBObject;
import com.mongodb.MongoClient;
import com.mongodb.MongoClientURI;
import com.mongodb.MongoCredential;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.MongoDatabase;
import com.shiku.imserver.CoreService;
import com.shiku.imserver.common.IService;
import com.shiku.imserver.common.ImConfig;
import com.shiku.imserver.common.constant.KConstants;
import com.shiku.imserver.common.message.AbstractBaseMessage;
import com.shiku.imserver.common.message.ChatMessage;
import com.shiku.imserver.common.message.PullBatchGroupMessage;
import com.shiku.imserver.common.message.PullBatchGroupRespMessage;
import com.shiku.imserver.common.packets.ImPacket;
import com.shiku.imserver.common.proto.MessageProBuf;
import com.shiku.imserver.common.utils.ProBufUtils;
import com.shiku.imserver.service.IMBeanUtils;

import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;

import org.bson.Document;
import org.bson.conversions.Bson;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tio.core.ChannelContext;


public class MessageRepositoryOld
        extends BaseRepository
        implements IService {
    private static Logger logger = LoggerFactory.getLogger(MessageRepositoryOld.class);


    private static final String MSGS_DB = "shiku_msgs";


    private static final String LASTCHAT_DB = "shiku_lastChats";


    private static final String LASTCHAT_MUC_COLLECTION = "lastChats_muc";


    private static final String MUCMsg_ = "mucmsg_";


    private static final String MESSAGE_OFFLINE = "message_offline";


    private MongoDatabase offMsgDB;


    private MongoDatabase chatMsgDB;


    private MongoDatabase lastMsgDB;


    private MongoDatabase mucdb;


    Queue<ChatMessage> queue = new LinkedBlockingQueue<>();


    MessageRepositoryThread thread;


    @Override
    public boolean initialize() {

        ImConfig.MongoConfig mongoConfig = IMBeanUtils.getImconfig().getMongoConfig();

        MongoClient mongoClient = IMBeanUtils.getBeanManager().getMongoClient();

        try {

            MongoClientURI uri = new MongoClientURI(mongoConfig.getUri());

            if (null != uri.getUsername()) {

                MongoCredential chatMsg_Credentials = MongoCredential.createCredential(uri.getUsername(), "shiku_msgs", uri.getPassword());

                MongoCredential lastMsg_Credentials = MongoCredential.createCredential(uri.getUsername(), "shiku_lastChats", uri.getPassword());

                mongoClient.getCredentialsList().add(chatMsg_Credentials);

                mongoClient.getCredentialsList().add(lastMsg_Credentials);

            }


            this.chatMsgDB = mongoClient.getDatabase("shiku_msgs");

            this.lastMsgDB = mongoClient.getDatabase("shiku_lastChats");


            if (null != uri.getUsername()) {

                MongoCredential imRoom_Credentials = MongoCredential.createCredential(uri.getURI(), "imRoom", uri.getPassword());

                mongoClient.getCredentialsList().add(imRoom_Credentials);

            }

            this.mucdb = mongoClient.getDatabase("imRoom");


            MongoCollection<Document> msgsCollection = null;


            msgsCollection = this.chatMsgDB.getCollection("shiku_msgs");

            msgsCollection.createIndex((Bson) new BasicDBObject("sender", Integer.valueOf(1)));

            msgsCollection.createIndex((Bson) new BasicDBObject("receiver", Integer.valueOf(1)));

            msgsCollection.createIndex((Bson) (new BasicDBObject("sender", Integer.valueOf(1))).append("receiver",
                    Integer.valueOf(1)));

            msgsCollection.createIndex((Bson) (new BasicDBObject("sender", Integer.valueOf(1))).append("receiver",
                    Integer.valueOf(1)).append("ts", Integer.valueOf(1)));


            MongoCollection<Document> lastMsgCollection = null;

            lastMsgCollection = this.lastMsgDB.getCollection("shiku_lastChats");


            lastMsgCollection.createIndex((Bson) new BasicDBObject("timeSend", Integer.valueOf(1)));


            this.offMsgDB = mongoClient.getDatabase("message_offline");

            this.thread = new MessageRepositoryThread();

            this.thread.start();

        } catch (Exception ex) {

            ex.printStackTrace();

            return false;

        }


        return true;

    }


    public void archiveMessage(ChatMessage model) {

        if (1 == model.getMessageHead().getChatType()) {

            storeChatMessage(model);

        } else if (2 == model.getMessageHead().getChatType()) {

            storeGroupMessage(model);

        }

    }


    private void storeChatMessage(ChatMessage model) {

        if (0 == model.getType()) {
            return;
        }

        if (26 == model.getType()) {


            IMBeanUtils.getMessageRepository()
                    .updateMsgReadStatus(Long.valueOf(model.getFromUserId()).longValue(), model.getContent());
            return;

        }

        if (5 == model.getType() / 100) {

            String from = model.getMessageHead().getFrom();

            String to = model.getMessageHead().getTo();

            String sender = CoreService.parseBareUserId(from);

            String receiver = CoreService.parseBareUserId(to);

            IMBeanUtils.getBeanManager().getNewFirendRepository()
                    .saveNewFirendRecord(sender, receiver, from, model.getType(), model.getContent());
            return;

        }

        if (9 == model.getType() / 100) {
            return;
        }

        if (120 < model.getType() && 202 != model.getType()) {

            return;

        }


        Document dbObj = new Document();

        dbObj.put("message", model.toString());


        dbObj.put("direction", Integer.valueOf(0));


        dbObj.put("sender", model.getFromUserId());

        dbObj.put("receiver", model.getToUserId());

        if (model.getFromUserId().equals(model.getToUserId())) {

            dbObj.put("sender", model.getMessageHead().getFrom());

            dbObj.put("receiver", model.getMessageHead().getTo());

        }

        dbObj.put("sender_jid", model.getMessageHead().getFrom());


        dbObj.put("receiver_jid", model.getMessageHead().getTo());


        dbObj.put("ts", Long.valueOf(System.currentTimeMillis()));


        dbObj.put("contentType", Short.valueOf(model.getType()));

        dbObj.put("messageId", model.getMessageHead().getMessageId());

        dbObj.put("timeSend", Long.valueOf(model.getTimeSend()));

        dbObj.put("deleteTime", Long.valueOf(model.getDeleteTime()));

        dbObj.put("isRead", Integer.valueOf(0));

        dbObj.put("isEncrypt", Boolean.valueOf(model.isEncrypt()));

        if (null != model.getContent()) {

            dbObj.put("content", model.getContent());

        }

        getCollection(this.chatMsgDB, Long.valueOf(model.getFromUserId()).longValue()).insertOne(dbObj);


        dbObj.replace("direction", Integer.valueOf(1));

        dbObj.replace("sender", model.getToUserId());

        dbObj.replace("receiver", model.getFromUserId());

        if (model.getFromUserId().equals(model.getToUserId())) {

            dbObj.put("sender", model.getMessageHead().getTo());

            dbObj.put("receiver", model.getMessageHead().getFrom());

        }

        dbObj.remove("_id");

        getCollection(this.chatMsgDB, Long.valueOf(model.getToUserId()).longValue()).insertOne(dbObj);


        LastChatModel lastChat = new LastChatModel();

        lastChat.setMessageId(model.getMessageHead().getMessageId());

        lastChat.setContent(model.getContent());

        lastChat.setUserId(model.getFromUserId());

        lastChat.setJid(model.getToUserId());

        lastChat.setIsRoom(0);

        lastChat.setType(model.getType());

        lastChat.setTimeSend(model.getTimeSend());

        lastChat.setEncrypt(model.isEncrypt());

        lastChat.setFrom(model.getMessageHead().getFrom());

        lastChat.setTo(model.getMessageHead().getTo());

        lastChat.setFromUserName(model.getFromUserName());

        lastChat.setToUserName(model.getToUserName());

        refreshLastChat(lastChat);

    }


    private void storeGroupMessage(ChatMessage model) {

        try {

            if (0 == model.getType() || 26 == model.getType()) {

                return;

            }


            String roomJid = model.getMessageHead().getTo();


            Document dbObj = new Document();

            dbObj.put("message", model.toString());

            dbObj.put("room_jid", roomJid);

            dbObj.put("sender_jid", model.getMessageHead().getFrom());

            dbObj.put("sender", model.getFromUserId());

            dbObj.put("ts", Long.valueOf(System.currentTimeMillis()));

            dbObj.put("contentType", Short.valueOf(model.getType()));

            dbObj.put("messageId", model.getMessageHead().getMessageId());

            dbObj.put("timeSend", Long.valueOf(model.getTimeSend()));

            dbObj.put("deleteTime", Long.valueOf(model.getDeleteTime()));

            if (null != model.getContent()) {

                dbObj.put("content", model.getContent());

            }

            this.mucdb.getCollection("mucmsg_" + roomJid).insertOne(dbObj);


            LastChatModel lastChat = new LastChatModel();

            lastChat.setMessageId(model.getMessageHead().getMessageId());

            lastChat.setContent(model.getContent());

            lastChat.setUserId(model.getFromUserId());

            lastChat.setJid(roomJid);

            lastChat.setIsRoom(1);

            lastChat.setType(model.getType());

            lastChat.setTimeSend(model.getTimeSend());

            lastChat.setEncrypt(model.isEncrypt());

            lastChat.setFrom(model.getMessageHead().getFrom());

            lastChat.setTo(model.getMessageHead().getTo());

            lastChat.setFromUserName(model.getFromUserName());

            lastChat.setToUserName(model.getToUserName());

            refreshLastChat(lastChat);

        } catch (Exception e) {

            e.printStackTrace();

        }

    }


    public ChatMessage loadGroupMessage(String roomJid, String messageId) {

        Document query = new Document("messageId", messageId);

        MongoCollection<Document> collection = this.mucdb.getCollection("mucmsg_" + roomJid);

        Document document = (Document) collection.find((Bson) query).first();

        if (null == document) {
            return null;
        }

        return (ChatMessage) JSON.parseObject(document.getString("message"), ChatMessage.class);

    }


    public long loadGroupHistoryCount(String jid, long lastTime) {

        MongoCollection<Document> collection = this.mucdb.getCollection("mucmsg_" + jid);

        Document query = new Document("room_jid", jid);

        query.append("timeSend", new Document("$gt", Long.valueOf(lastTime)));


        return collection.count((Bson) query);

    }


    public void pullBatchGroupHistoryMessage(ChannelContext channelContext, PullBatchGroupMessage pullBatchGroupMessageReq) {

        if (null == pullBatchGroupMessageReq.getJidList()) {
            return;
        }

        String[] split = null;

        String jid = null;

        long lastTime = 0L;

        PullBatchGroupRespMessage groupHistoryMessage = null;

        ImPacket imPacket = null;

        long endTime = pullBatchGroupMessageReq.getEndTime();

        String from = pullBatchGroupMessageReq.getMessageHead().getFrom();

        try {

            for (String str : pullBatchGroupMessageReq.getJidList()) {

                if (channelContext.isClosed) {

                    logger.error(" channelContext isClosed => {} " + from);

                    break;

                }

                if (KConstants.isDebug) {

                    logger.info("pull message userId > {}  jid {}", from, str);

                }

                split = str.split(",");

                if (2 > split.length) {
                    continue;
                }

                jid = split[0];

                lastTime = Long.valueOf(split[1]).longValue();

                groupHistoryMessage = queryGroupHistoryMessage(jid, lastTime, endTime);

                if (null == groupHistoryMessage) {
                    continue;
                }

                groupHistoryMessage.setMessageId(pullBatchGroupMessageReq.getMessageHead().getMessageId());

                imPacket = ProBufUtils.encodeImPacket((AbstractBaseMessage) groupHistoryMessage, MessageProBuf.PullGroupMessageRespProBuf.getDescriptor());

                imPacket.setCommand((short) 15);

                CoreService.send(channelContext, imPacket);

            }

        } catch (Exception e) {

            logger.error(e.getMessage());

        }

    }


    public List<ChatMessage> loadGroupHistoryMessage(String jid, long seconds) {

        List<ChatMessage> result = new ArrayList<>();

        Document query = new Document("room_jid", jid);

        query.append("timeSend", new Document("$gt", Long.valueOf(System.currentTimeMillis() - seconds)));

        Document sort = new Document("timeSend", Integer.valueOf(-1));

        MongoCollection<Document> collection = this.mucdb.getCollection("mucmsg_" + jid);


        MongoCursor<Document> iterator = collection.find((Bson) query).sort((Bson) sort).limit(100).iterator();

        String messageStr = null;


        try {

            while (iterator.hasNext()) {

                try {

                    messageStr = ((Document) iterator.next()).getString("message");

                    ChatMessage message = (ChatMessage) JSON.parseObject(messageStr, ChatMessage.class);

                    if (null == message) {
                        continue;
                    }

                    result.add(message);

                } catch (Exception e) {

                    logger.error(" Exception {}", e.getMessage());

                }

            }

        } finally {

            if (null != iterator) {
                iterator.close();
            }

        }

        Collections.reverse(result);

        return result;

    }


    public PullBatchGroupRespMessage queryGroupHistoryMessage(String jid, long startTime, long endTime) {

        long count = 0L;

        int limit = 20;

        Document query = new Document("room_jid", jid);

        query.append("timeSend", (new Document("$gt", Long.valueOf(startTime))).append("$lt", Long.valueOf(endTime)));

        MongoCollection<Document> collection = this.mucdb.getCollection("mucmsg_" + jid);

        count = collection.count((Bson) query);

        if (0L == count) {
            return null;
        }

        if (count < limit) {
            limit = (int) count;
        }

        Document sort = new Document("timeSend", Integer.valueOf(-1));


        MongoCursor<Document> iterator = collection.find((Bson) query).sort((Bson) sort).limit(limit).iterator();

        List<ChatMessage> messageList = new ArrayList<>();

        String messageStr = null;

        try {

            while (iterator.hasNext()) {

                try {

                    messageStr = ((Document) iterator.next()).getString("message");

                    ChatMessage message = (ChatMessage) JSON.parseObject(messageStr, ChatMessage.class);

                    if (null == message) {
                        continue;
                    }

                    messageList.add(message);

                } catch (Exception e) {

                    logger.error(" Exception {}", e.getMessage());

                }

            }

        } finally {

            if (null != iterator) {
                iterator.close();
            }

        }

        Collections.reverse(messageList);

        PullBatchGroupRespMessage result = new PullBatchGroupRespMessage();

        result.setJid(jid);

        result.setCount(count);

        result.setMessageList(messageList);

        return result;

    }


    public List<ChatMessage> pullGroupHistoryMessage(String jid, int size, long startTime, long endTime) {

        List<ChatMessage> result = new ArrayList<>();

        Document query = new Document("room_jid", jid);

        MongoCollection<Document> collection = this.mucdb.getCollection("mucmsg_" + jid);

        if (0L < startTime) {
            query.append("timeSend", new Document("$gt", Long.valueOf(startTime)));
        }

        if (0L < endTime) {

            query.append("timeSend", new Document("$lt", Long.valueOf(endTime)));

        }

        Document sort = new Document("timeSend", Integer.valueOf(-1));


        MongoCursor<Document> iterator = collection.find((Bson) query).sort((Bson) sort).limit(size).iterator();

        String messageStr = null;


        try {

            while (iterator.hasNext()) {

                try {

                    messageStr = ((Document) iterator.next()).getString("message");

                    ChatMessage message = (ChatMessage) JSON.parseObject(messageStr, ChatMessage.class);

                    if (null == message) {
                        continue;
                    }

                    result.add(message);

                } catch (Exception e) {

                    logger.error(" Exception {}", e.getMessage());

                }

            }

        } finally {

            if (null != iterator) {
                iterator.close();
            }

        }

        Collections.reverse(result);

        return result;

    }


    public void refreshLastChat(LastChatModel model) {

        if (null == model.getJid()) {
            return;
        }

        MongoCollection<Document> collection = null;


        if (0 == model.getIsRoom()) {

            collection = getCollection(this.lastMsgDB, Long.valueOf(model.getUserId()).longValue());

        } else {

            collection = this.lastMsgDB.getCollection("lastChats_muc");

        }
        collection.find().first();

        Document query = new Document("jid", model.getJid());


        Document values = new Document("type", Integer.valueOf(model.getType()));

        values.append("messageId", model.getMessageId());

        values.append("timeSend", Long.valueOf(model.getTimeSend()));

        values.append("content", model.getContent());


        values.append("userId", model.getUserId());

        values.append("jid", model.getJid());

        values.append("isRoom", Integer.valueOf(model.getIsRoom()));

        values.append("isEncrypt", Boolean.valueOf(model.isEncrypt()));

        values.append("from", model.getFrom());

        values.append("to", model.getTo());

        values.append("fromUserName", model.getFromUserName());

        values.append("toUserName", model.getToUserName());

        if (1 != model.getIsRoom()) {

            query.append("userId", model.getUserId());

        }

        if (0L < collection.count((Bson) query)) {

            collection.updateMany((Bson) query, (Bson) new Document("$set", values));

        } else {

            collection.insertOne(values);

        }

        if (1 == model.getIsRoom() || model.getUserId().equals(model.getJid())) {
            return;
        }

        if (1 != model.getIsRoom()) {

            query.replace("userId", model.getJid());

            query.replace("jid", model.getUserId());


            values.replace("userId", model.getJid());

            values.replace("jid", model.getUserId());

        }

        values.remove("_id");

        collection = getCollection(this.lastMsgDB, Long.valueOf(model.getJid()).longValue());

        if (0L < collection.count((Bson) query)) {

            collection.updateMany((Bson) query, (Bson) new Document("$set", values));

        } else {

            collection.insertOne(values);

        }

    }


    public void updateMsgReadStatus(long sender, String msgId) {

        if (null == msgId) {
            return;
        }

        Document query = new Document("messageId", msgId);

        getCollection(this.chatMsgDB, sender).updateOne((Bson) query, (Bson) new Document("$set", new BasicDBObject("isRead", Integer.valueOf(1))));

    }


    public void deleteChatOffToJID(String userJid) {
    }


    public ChatMessage loadChatMessage(String userId, String messageId) {

        Document query = new Document("messageId", messageId);

        MongoCollection<Document> collection = getCollection(this.chatMsgDB, Integer.valueOf(userId).intValue());

        Document document = (Document) collection.find((Bson) query).first();

        if (null == document) {
            return null;
        }

        return (ChatMessage) JSON.parseObject(document.getString("message"), ChatMessage.class);

    }


    public List<ChatMessage> pullChatHistoryMessage(String userJid, String toJid, int size, long startTime, long endTime) {

        List<ChatMessage> result = new ArrayList<>();


        Document query = new Document("sender", userJid);

        query.append("receiver", toJid);

        if (0L < startTime) {
            query.append("timeSend", new Document("$gt", Long.valueOf(startTime)));
        }

        if (0L < endTime) {
            query.append("timeSend", new Document("$lt", Long.valueOf(endTime)));
        }

        Document sort = new Document("timeSend", Integer.valueOf(-1));

        MongoCollection<Document> collection = getCollection(this.chatMsgDB, Integer.valueOf(userJid).intValue());


        MongoCursor<Document> iterator = collection.find((Bson) query).sort((Bson) sort).limit(size).iterator();

        String messageStr = null;

        while (iterator.hasNext()) {

            try {

                messageStr = ((Document) iterator.next()).getString("message");

                ChatMessage message = (ChatMessage) JSON.parseObject(messageStr, ChatMessage.class);

                if (null == message) {
                    continue;
                }

                result.add(message);

            } catch (Exception e) {

                logger.error(" Exception {}", e.getMessage());

            }

        }


        Collections.reverse(result);

        return result;

    }


    public Queue<ChatMessage> loadChatOffToJID(String userJid, boolean delete) {

        Queue<ChatMessage> result = new LinkedList<>();

        Document query = new Document("receiver", userJid);

        Document sort = new Document("timeSend", Integer.valueOf(1));

        MongoCollection<Document> collection = getCollection(this.offMsgDB, Integer.valueOf(userJid).intValue());


        MongoCursor<Document> iterator = collection.find((Bson) query).sort((Bson) sort).iterator();

        String messageStr = null;


        try {

            while (iterator.hasNext()) {

                try {

                    messageStr = ((Document) iterator.next()).getString("message");

                    ChatMessage message = (ChatMessage) JSON.parseObject(messageStr, ChatMessage.class);

                    if (null == message) {
                        continue;
                    }

                    result.add(message);

                } catch (Exception e) {

                    logger.error(" Exception {}", e.getMessage());

                }

            }

        } finally {


            if (null != iterator) {
                iterator.close();
            }

        }

        collection.deleteMany((Bson) query);

        return result;

    }


    public void storeChatOffMessage(ChatMessage message) {

        try {

            Document dbObj = new Document();

            dbObj.put("message", message.toString());

            dbObj.put("to", message.getMessageHead().getTo());

            dbObj.put("from", message.getMessageHead().getFrom());

            dbObj.put("sender", message.getFromUserId());


            dbObj.put("receiver", message.getToUserId());

            dbObj.put("contentType", Short.valueOf(message.getType()));

            dbObj.put("messageId", message.getMessageHead().getMessageId());

            dbObj.put("timeSend", Long.valueOf(message.getTimeSend()));

            dbObj.put("content", message.getContent());

            getCollection(this.offMsgDB, Integer.valueOf(message.getToUserId()).intValue()).insertOne(dbObj);

        } catch (Exception e) {

            logger.error(e.getMessage());

        }

    }


    public boolean offerMessage(ChatMessage message) {

        return this.queue.offer(message);

    }


    public class MessageRepositoryThread
            extends Thread {

        @Override
        public void run() {

            while (true) {

                while (MessageRepositoryOld.this.queue.isEmpty()) {

                    try {

                        Thread.sleep(2000L);

                    } catch (Exception e) {

                        e.printStackTrace();

                    }

                }

                ChatMessage message = MessageRepositoryOld.this.queue.poll();

                if (1 == message.getMessageHead().getChatType()) {

                    MessageRepositoryOld.this.storeChatMessage(message);
                    continue;

                }
                if (2 == message.getMessageHead().getChatType()) {
                    MessageRepositoryOld.this.storeGroupMessage(message);
                }

            }

        }

    }

}


